Predicting Air Quality
Analyzing air quality data from Nairobi, Lagos, and Dar es Salaam and build a time seriesmodel to predict PM 2.5 readings throughout the day.. A few rules must followed:
import inspect
import time
import warnings
import matplotlib.pyplot as plt
import pandas as pd
import plotly.express as px
import seaborn as sns
from pprint import PrettyPrinter
from pymongo import MongoClient
from sklearn.metrics import mean_absolute_error
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.arima.model import ARIMA
warnings.filterwarnings("ignore")
The data is made out of device readings thus, as a non structured data can be difficult to read sometimes using PrettyPrinter might be a good idea
pp = PrettyPrinter(indent=2)
# Connecting to MongoDB database
client = MongoClient(host="localhost", port=27017)
# Listing its databases
pp.pprint(list(client.list_databases()))
[ {'empty': False, 'name': 'admin', 'sizeOnDisk': 40960},
{'empty': False, 'name': 'air-quality', 'sizeOnDisk': 6938624},
{'empty': False, 'name': 'config', 'sizeOnDisk': 98304},
{'empty': False, 'name': 'local', 'sizeOnDisk': 73728}]
The only database which is not part of a system related features is air-quality
# Assigning the "air-quality" database to a variable
db = client["air-quality"]
# Using the "list_collections" method to print a list of the collections available in "db"
for c in db.list_collections():
print(c['name'])
lagos system.buckets.lagos dar-es-salaam system.buckets.dar-es-salaam nairobi system.buckets.nairobi system.views
Again, there are several collections from the system, the only ones which are not are lagos, dar-es-salam and nairobi
# Assigning the "nairobi" collection in db to the variable name nairobi
nairobi = db["nairobi"]
# Using the "count_documents" method to see how many documents are in the "nairobi" collection
nairobi.count_documents({})
202212
# Using the "find_one" method to retrieve one document from the "nairobi" collection, and assign it to the variable name "result"
result = nairobi.find_one({})
pp.pprint(result)
{ 'P1': 39.67,
'_id': ObjectId('62a1510e5e1eb913597171ee'),
'metadata': { 'lat': -1.3,
'lon': 36.785,
'measurement': 'P1',
'sensor_id': 57,
'sensor_type': 'SDS011',
'site': 29},
'timestamp': datetime.datetime(2018, 9, 1, 0, 0, 2, 472000)}
By looking at how the data is organized is it possible to notice a certain pattern regarding sensor location and readings
# Using the "distinct" method to determine how many sensor sites are included in the "nairobi" collection
nairobi.distinct("metadata.site")
[29, 6]
# Using the "count_documents" method to determine how many readings there are for each site in the "nairobi" collection
print("Documents from site 6:", nairobi.count_documents({"metadata.site":6}))
print("Documents from site 29:", nairobi.count_documents({"metadata.site":29}))
Documents from site 6: 70360 Documents from site 29: 131852
# Using the "distinct" method to determine how many types of measurements have been taken in the "nairobi" collection
nairobi.distinct("metadata.measurement")
['humidity', 'P2', 'temperature', 'P1']
# Using the "find" method to retrieve the PM 2.5 readings from all sites
result = nairobi.find({"metadata.measurement": "P2"}).limit(5)
pp.pprint(list(result))
[ { 'P2': 34.43,
'_id': ObjectId('62a1510f5e1eb9135971f279'),
'metadata': { 'lat': -1.3,
'lon': 36.785,
'measurement': 'P2',
'sensor_id': 57,
'sensor_type': 'SDS011',
'site': 29},
'timestamp': datetime.datetime(2018, 9, 1, 0, 0, 2, 472000)},
{ 'P2': 30.53,
'_id': ObjectId('62a1510f5e1eb9135971f27a'),
'metadata': { 'lat': -1.3,
'lon': 36.785,
'measurement': 'P2',
'sensor_id': 57,
'sensor_type': 'SDS011',
'site': 29},
'timestamp': datetime.datetime(2018, 9, 1, 0, 5, 3, 941000)},
{ 'P2': 22.8,
'_id': ObjectId('62a1510f5e1eb9135971f27b'),
'metadata': { 'lat': -1.3,
'lon': 36.785,
'measurement': 'P2',
'sensor_id': 57,
'sensor_type': 'SDS011',
'site': 29},
'timestamp': datetime.datetime(2018, 9, 1, 0, 10, 4, 374000)},
{ 'P2': 13.3,
'_id': ObjectId('62a1510f5e1eb9135971f27c'),
'metadata': { 'lat': -1.3,
'lon': 36.785,
'measurement': 'P2',
'sensor_id': 57,
'sensor_type': 'SDS011',
'site': 29},
'timestamp': datetime.datetime(2018, 9, 1, 0, 15, 4, 245000)},
{ 'P2': 16.57,
'_id': ObjectId('62a1510f5e1eb9135971f27d'),
'metadata': { 'lat': -1.3,
'lon': 36.785,
'measurement': 'P2',
'sensor_id': 57,
'sensor_type': 'SDS011',
'site': 29},
'timestamp': datetime.datetime(2018, 9, 1, 0, 20, 4, 869000)}]
# Using the "aggregate" method to calculate how many readings there are for each type ("humidity", "temperature", "P2", and "P1") in site 6
result = nairobi.aggregate(
[
{"$match": {"metadata.site": 6}},
{"$group": {"_id": "$metadata.measurement", "count": {"$count": {}}}}
]
)
pp.pprint(list(result))
[ {'_id': 'P2', 'count': 18169},
{'_id': 'humidity', 'count': 17011},
{'_id': 'P1', 'count': 18169},
{'_id': 'temperature', 'count': 17011}]
# Using the "aggregate" method to calculate how many readings there are for each type ("humidity", "temperature", "P2", and "P1") in site 29
result = nairobi.aggregate(
[
{"$match": {"metadata.site": 29}},
{"$group": {"_id": "$metadata.measurement", "count": {"$count": {}}}}
]
)
pp.pprint(list(result))
[ {'_id': 'humidity', 'count': 33019},
{'_id': 'P2', 'count': 32907},
{'_id': 'temperature', 'count': 33019},
{'_id': 'P1', 'count': 32907}]
Using the find method to retrieve the PM 2.5 readings from site 29. Since it is intended to build a time series model there is no need for the metadata thus, limiting the results to the P2 and timestamp keys only is ideal
result = nairobi.find(
{"metadata.site": 29, "metadata.measurement": "P2"},
projection={"P2": 1, "timestamp": 1, "_id": 0}
).limit(5)
pp.pprint(list(result))
[ {'P2': 34.43, 'timestamp': datetime.datetime(2018, 9, 1, 0, 0, 2, 472000)},
{'P2': 30.53, 'timestamp': datetime.datetime(2018, 9, 1, 0, 5, 3, 941000)},
{'P2': 22.8, 'timestamp': datetime.datetime(2018, 9, 1, 0, 10, 4, 374000)},
{'P2': 13.3, 'timestamp': datetime.datetime(2018, 9, 1, 0, 15, 4, 245000)},
{'P2': 16.57, 'timestamp': datetime.datetime(2018, 9, 1, 0, 20, 4, 869000)}]
# Reading records from "result" into the DataFrame df
result = nairobi.find(
{"metadata.site": 29, "metadata.measurement": "P2"},
projection={"P2": 1, "timestamp": 1, "_id": 0}
)
df = pd.DataFrame(result).set_index("timestamp")
df.head()
| P2 | |
|---|---|
| timestamp | |
| 2018-09-01 00:00:02.472 | 34.43 |
| 2018-09-01 00:05:03.941 | 30.53 |
| 2018-09-01 00:10:04.374 | 22.80 |
| 2018-09-01 00:15:04.245 | 13.30 |
| 2018-09-01 00:20:04.869 | 16.57 |
len(df)
32907
Now that I have an idea about what I will be working on I think it would be ideal to build a wrangle function to treat my DataFrame. Furthermore it will be important to work with the correct timezone, "Africa/Nairobi"
def wrangle(collection):
"""
Read a collection data from a MongoDB into a 'DataFrame'.
Returns treated Data in a pandas DataFrame
Parameters
----------
collection: pymongo.collection.Collection
MongoDB collection
"""
# Retrieving PM 2.5 readings from site 29
results = collection.find(
{"metadata.site": 29, "metadata.measurement": "P2"},
projection={"P2": 1, "timestamp": 1, "_id": 0},
)
# Reading records from "result" into the DataFrame df
df = pd.DataFrame(results).set_index("timestamp")
# Localize timezone
df.index = df.index.tz_localize("UTC").tz_convert("Africa/Nairobi")
return df
client = MongoClient(host="localhost", port=27017)
db = client["air-quality"]
nairobi = db["nairobi"]
df = wrangle(nairobi)
df.head()
| P2 | |
|---|---|
| timestamp | |
| 2018-09-01 03:00:02.472000+03:00 | 34.43 |
| 2018-09-01 03:05:03.941000+03:00 | 30.53 |
| 2018-09-01 03:10:04.374000+03:00 | 22.80 |
| 2018-09-01 03:15:04.245000+03:00 | 13.30 |
| 2018-09-01 03:20:04.869000+03:00 | 16.57 |
To help visualize the data it is always interesting to plot it, this time I will be using a boxplot to help me understand the distribution of PM2.5 readings
fig, ax = plt.subplots(figsize=(15, 6))
df["P2"].plot(kind="box", vert=False, title="Distribution of PM2.5 Readings", ax=ax);
It is easy to notice a few outliers on the dataset which ideally should be dropped
len(df)
32907
# Remove outliers by "P2"
low, high = df["P2"].quantile([0.1, 0.9])
mask_area = df["P2"].between(low, high)
df = df[mask_area]
len(df)
26378
fig, ax = plt.subplots(figsize=(15, 6))
df["P2"].plot(kind="box", vert=False, title="Distribution of PM2.5 Readings", ax=ax);
Seems like the standard outlier removing procedure might not be ideal for this dataset since it drops too many rows. Another good attempt would be to address the problem by doing a little bit of a back research on PM2.5. According to https://blissair.com/what-is-pm-2-5.htm a really high reading would be from 250 to 500 which is considered hazardous. So I will be using it as my maximum treshold and will be dropping anything above 500. (I will be repeating the wrangle function in order to make it easier to read and understand, in a real project I would only define it once with all the filter details)
def wrangle(collection):
"""
Read a collection data from a MongoDB into a 'DataFrame'.
Returns treated Data in a pandas DataFrame
Parameters
----------
collection: pymongo.collection.Collection
MongoDB collection
"""
# Retrieving PM 2.5 readings from site 29
results = collection.find(
{"metadata.site": 29, "metadata.measurement": "P2"},
projection={"P2": 1, "timestamp": 1, "_id": 0},
)
# Reading records from "result" into the DataFrame df
df = pd.DataFrame(results).set_index("timestamp")
# Localize timezone
df.index = df.index.tz_localize("UTC").tz_convert("Africa/Nairobi")
# Remove utliers
df = df[df["P2"] < 500]
return df
client = MongoClient(host="localhost", port=27017)
db = client["air-quality"]
nairobi = db["nairobi"]
df = wrangle(nairobi)
df.head()
| P2 | |
|---|---|
| timestamp | |
| 2018-09-01 03:00:02.472000+03:00 | 34.43 |
| 2018-09-01 03:05:03.941000+03:00 | 30.53 |
| 2018-09-01 03:10:04.374000+03:00 | 22.80 |
| 2018-09-01 03:15:04.245000+03:00 | 13.30 |
| 2018-09-01 03:20:04.869000+03:00 | 16.57 |
fig, ax = plt.subplots(figsize=(15, 6))
df["P2"].plot(kind="box", vert=False, title="Distribution of PM2.5 Readings", ax=ax);
# Time series plot of the "P2" readings in df
fig, ax = plt.subplots(figsize=(15, 6))
df["P2"].plot(xlabel="Time", ylabel="PM2.5", title="PM2.5 Time Series", ax=ax);
The given dataset has readings every 5 minutes. However, for my model I will try to predict the readings at every hour. Also, when working with timeseries it is interesting to use a forward fill to impute any missing values.
df["P2"].resample("1H").mean().fillna(method="ffill").head()
timestamp 2018-09-01 03:00:00+03:00 17.541667 2018-09-01 04:00:00+03:00 15.800000 2018-09-01 05:00:00+03:00 11.420000 2018-09-01 06:00:00+03:00 11.614167 2018-09-01 07:00:00+03:00 17.665000 Freq: H, Name: P2, dtype: float64
def wrangle(collection, resample_rule="1H"):
"""
Read a collection data from a MongoDB into a 'DataFrame'.
Returns treated Data in a pandas DataFrame
Parameters
----------
collection: pymongo.collection.Collection
MongoDB collection
resample_rule: str
Resample rule interval duration
"""
# Retrieving PM 2.5 readings from site 29
results = collection.find(
{"metadata.site": 29, "metadata.measurement": "P2"},
projection={"P2": 1, "timestamp": 1, "_id": 0},
)
# Reading records from "result" into the DataFrame df
df = pd.DataFrame(results).set_index("timestamp")
# Localize timezone
df.index = df.index.tz_localize("UTC").tz_convert("Africa/Nairobi")
# Remove utliers
df = df[df["P2"] < 500]
# Resample to 1h window ffill missing values
df = df["P2"].resample(resample_rule).mean().fillna(method="ffill").to_frame()
return df
client = MongoClient(host="localhost", port=27017)
db = client["air-quality"]
nairobi = db["nairobi"]
df = wrangle(nairobi)
df.head()
| P2 | |
|---|---|
| timestamp | |
| 2018-09-01 03:00:00+03:00 | 17.541667 |
| 2018-09-01 04:00:00+03:00 | 15.800000 |
| 2018-09-01 05:00:00+03:00 | 11.420000 |
| 2018-09-01 06:00:00+03:00 | 11.614167 |
| 2018-09-01 07:00:00+03:00 | 17.665000 |
Plotting a roling average might be a good idea to help me identify trends, maybe seasonal patterns
# Creating the canvas
fig, ax = plt.subplots(figsize=(15, 6))
# Rolling average of a week (168 time periods, 7*24)
df["P2"].rolling(168).mean().plot(ax=ax, ylabel="PM2.5", title="Weekly rolling average");
For this dataset I will be using the previous reading to try to predict future readings thus, creating a lag feature could be interesting
# Lag column shifting the value one place
df["P2.L1"] = df["P2"].shift(1)
# Since it is shifting the values one place there will be NaN values to get rid off
df.dropna().head()
| P2 | P2.L1 | |
|---|---|---|
| timestamp | ||
| 2018-09-01 04:00:00+03:00 | 15.800000 | 17.541667 |
| 2018-09-01 05:00:00+03:00 | 11.420000 | 15.800000 |
| 2018-09-01 06:00:00+03:00 | 11.614167 | 11.420000 |
| 2018-09-01 07:00:00+03:00 | 17.665000 | 11.614167 |
| 2018-09-01 08:00:00+03:00 | 21.016667 | 17.665000 |
Now I will be taking a look at the correlation matrix just to check if everything is going accordin to plan
cor = df.corr()
df.corr()
| P2 | P2.L1 | |
|---|---|---|
| P2 | 1.000000 | 0.650679 |
| P2.L1 | 0.650679 | 1.000000 |
sns.heatmap(cor);
An even better way to see wheter I have predictive power from previous readings is to plot their readings
# Creating the canvas
fig, ax = plt.subplots(figsize=(6, 6))
# Scatter plot
ax.scatter(x=df["P2.L1"], y=df["P2"])
# Plot a dashed line
ax.plot([0, 120], [0, 120], linestyle="--", color="red")
# Update axis and title details
plt.xlabel("P2.L1")
plt.ylabel("P2")
plt.title("PM2.5 Autocorrelation");
Seems like the dataset is good enough to start building
def wrangle(collection, resample_rule="1H"):
"""
Read a collection data from a MongoDB into a 'DataFrame'.
Returns treated Data in a pandas DataFrame
Parameters
----------
collection: pymongo.collection.Collection
MongoDB collection
resample_rule: str
Resample rule interval duration
"""
# Retrieving PM 2.5 readings from site 29
results = collection.find(
{"metadata.site": 29, "metadata.measurement": "P2"},
projection={"P2": 1, "timestamp": 1, "_id": 0},
)
# Read results into DataFrame
df = pd.DataFrame(list(results)).set_index("timestamp")
# Localize timezone
df.index = df.index.tz_localize("UTC").tz_convert("Africa/Nairobi")
# Remove outliers
df = df[df["P2"] < 500]
# Resample and forward-fill
df = df["P2"].resample(resample_rule).mean().fillna(method='ffill')
return df
y = wrangle(nairobi)
y.head()
timestamp 2018-09-01 03:00:00+03:00 17.541667 2018-09-01 04:00:00+03:00 15.800000 2018-09-01 05:00:00+03:00 11.420000 2018-09-01 06:00:00+03:00 11.614167 2018-09-01 07:00:00+03:00 17.665000 Freq: H, Name: P2, dtype: float64
As the lag increases the predictive power decreases. The ACF plot will help me visualize that
# Create canvas
fig, ax = plt.subplots(figsize=(15, 6))
# Plot ACF
plot_acf(y, ax=ax)
# Update axis labels
plt.xlabel("Lag [hours]")
plt.ylabel("Correlation Coefficient");
The PACF will help me determine the best lag values since it removes the echos (previous correlated values)
# Create canvas
fig, ax = plt.subplots(figsize=(15, 6))
# Plot PACF
plot_pacf(y, ax=ax)
# Update axis labels
plt.xlabel("Lag [hours]")
plt.ylabel("Correlation Coefficient");
From previous plots it is noticeable that October and November have a similar pattern. I will be trying to use them as a test case for my model
# Train dataset from October
y_train = y[(y.index >= "2018-10-01") & (y.index < "2018-11-01")]
# Test dataset from November
y_test = y[(y.index >= "2018-11-01") & (y.index < "2018-11-02")]
# The overall mean will the the baseline
y_train_mean = y_train.mean()
# One value for each row
y_pred_baseline = [y_train_mean] * len(y_train)
mae_baseline = mean_absolute_error(y_train, y_pred_baseline)
print("Mean P2 Reading:", round(y_train_mean, 2))
print("Baseline MAE:", round(mae_baseline, 2))
Mean P2 Reading: 10.12 Baseline MAE: 4.17
The model will be based on the ARMA model, so I need to define p and q values
# Creating ranges for possible 𝑝 and 𝑞 values. "p_params"
p_params = range(0, 25, 8)
q_params = range(0, 3, 1)
Train a model with every combination of hyperparameters in "p_params" and "q_params". Every time the model is trained, the mean absolute error is calculated and then saved to a dictionary
# Create dictionary to store MAEs
mae_grid = dict()
# Outer loop: Iterate through possible values for "p"
for p in p_params:
# Create key-value pair in dict. Key is "p", value is empty list.
mae_grid[p] = list()
# Inner loop: Iterate through possible values for "q"
for q in q_params:
# Combination of hyperparameters for model
order = (p, 0, q)
# Note start time
start_time = time.time()
# Train model
model = ARIMA(y_train, order=order).fit()
# Calculate model training time
elapsed_time = round(time.time() - start_time, 2)
print(f"Trained ARIMA {order} in {elapsed_time} seconds.")
# Generate in-sample (training) predictions
y_pred = model.predict()
# Calculate training MAE
mae = mean_absolute_error(y_train, y_pred)
# Append MAE to list in dictionary
mae_grid[p].append(mae)
print()
print(mae_grid)
Trained ARIMA (0, 0, 0) in 0.33 seconds.
Trained ARIMA (0, 0, 1) in 0.81 seconds.
Trained ARIMA (0, 0, 2) in 2.2 seconds.
Trained ARIMA (8, 0, 0) in 11.2 seconds.
Trained ARIMA (8, 0, 1) in 43.4 seconds.
Trained ARIMA (8, 0, 2) in 78.09 seconds.
Trained ARIMA (16, 0, 0) in 39.01 seconds.
Trained ARIMA (16, 0, 1) in 132.2 seconds.
Trained ARIMA (16, 0, 2) in 195.2 seconds.
Trained ARIMA (24, 0, 0) in 102.41 seconds.
Trained ARIMA (24, 0, 1) in 137.71 seconds.
Trained ARIMA (24, 0, 2) in 249.29 seconds.
{0: [4.171460443827197, 3.3506427433555537, 3.1057222587888647], 8: [2.9384480570404223, 2.9149008203151663, 2.908046094677133], 16: [2.9201084726122, 2.929436207635203, 2.913638894139686], 24: [2.914390327277196, 2.9136013252035013, 2.89792511429827]}
Organizing all the MAE's from above in a DataFrame names mae_df. Each row represents a possible value for 𝑞 and each column represents a possible value for 𝑝 .
mae_df = pd.DataFrame(mae_grid)
mae_df.round(4)
| 0 | 8 | 16 | 24 | |
|---|---|---|---|---|
| 0 | 4.1715 | 2.9384 | 2.9201 | 2.9144 |
| 1 | 3.3506 | 2.9149 | 2.9294 | 2.9136 |
| 2 | 3.1057 | 2.9080 | 2.9136 | 2.8979 |
Creating a map of the values in mae_grid
sns.heatmap(mae_df, cmap="Blues")
plt.xlabel("p values")
plt.ylabel("q values")
plt.title("ARMA Grid Search (Criterion: MAE)");
Seems like all models from the second column onward performed similar. However, their computational costs vary quite a bit so an interesting approach would be to choose the one which took the least amount of time to compute
Use the plot_diagnostics method to check the residuals for the model
fig, ax = plt.subplots(figsize=(15, 12))
model.plot_diagnostics(fig=fig);
I will be using a walk-forward validation for the model for the entire test set y_test, storing the model's predictions in the Series y_pred_wfv
# Series instanciation
y_pred_wfv = pd.Series()
# Copying the train set
history = y_train.copy()
for i in range(len(y_test)):
# Best model when performance and computation time are put into count
model = ARIMA(history, order=[8, 0, 1]).fit()
# Getting all predictions
next_pred = model.forecast()
y_pred_wfv = y_pred_wfv.append(next_pred)
history = history.append(y_test[next_pred.index])
The MAE should easily the baseline
test_mae = mean_absolute_error(y_test, y_pred_wfv)
print("Test MAE (walk forward validation):", round(test_mae, 2))
Test MAE (walk forward validation): 1.67
Finally I will be plotting a Test X Pred fig
df_predictions = pd.DataFrame(
{
"y_test": y_test,
"y_pred_wfv": y_pred_wfv
}
)
fig = px.line(df_predictions, labels={"value": "PM2.5"})
fig.show()